1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.collect;
18
19 import com.google.common.util.concurrent.Uninterruptibles;
20
21 import junit.framework.TestCase;
22
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.PriorityBlockingQueue;
32 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.TimeUnit;
34
35
36
37
38
39
40
41 public class QueuesTest extends TestCase {
42
43
44
45
46 public static List<BlockingQueue<Object>> blockingQueues() {
47 return ImmutableList.<BlockingQueue<Object>>of(
48 new LinkedBlockingQueue<Object>(),
49 new LinkedBlockingQueue<Object>(10),
50 new SynchronousQueue<Object>(),
51 new ArrayBlockingQueue<Object>(10),
52 new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
53 }
54
55 private ExecutorService threadPool;
56
57 @Override
58 public void setUp() {
59 threadPool = Executors.newCachedThreadPool();
60 }
61
62 @Override
63 public void tearDown() throws InterruptedException {
64
65
66 threadPool.shutdown();
67 assertTrue("Some worker didn't finish in time",
68 threadPool.awaitTermination(1, TimeUnit.SECONDS));
69 }
70
71 private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements,
72 long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException {
73 return interruptibly
74 ? Queues.drain(q, buffer, maxElements, timeout, unit)
75 : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
76 }
77
78 public void testMultipleProducers() throws Exception {
79 for (BlockingQueue<Object> q : blockingQueues()) {
80 testMultipleProducers(q);
81 }
82 }
83
84 private void testMultipleProducers(BlockingQueue<Object> q)
85 throws InterruptedException {
86 for (boolean interruptibly : new boolean[] { true, false }) {
87 threadPool.submit(new Producer(q, 20));
88 threadPool.submit(new Producer(q, 20));
89 threadPool.submit(new Producer(q, 20));
90 threadPool.submit(new Producer(q, 20));
91 threadPool.submit(new Producer(q, 20));
92
93 List<Object> buf = Lists.newArrayList();
94 int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly);
95 assertEquals(100, elements);
96 assertEquals(100, buf.size());
97 assertDrained(q);
98 }
99 }
100
101 public void testDrainTimesOut() throws Exception {
102 for (BlockingQueue<Object> q : blockingQueues()) {
103 testDrainTimesOut(q);
104 }
105 }
106
107 private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
108 for (boolean interruptibly : new boolean[] { true, false }) {
109 assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS));
110
111
112 Future<?> submitter = threadPool.submit(new Producer(q, 1));
113
114
115 long startTime = System.nanoTime();
116
117 int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly);
118 assertTrue(drained <= 1);
119
120 assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
121
122
123 submitter.get();
124 if (drained == 0) {
125 assertNotNull(q.poll());
126 }
127 }
128 }
129
130 public void testZeroElements() throws Exception {
131 for (BlockingQueue<Object> q : blockingQueues()) {
132 testZeroElements(q);
133 }
134 }
135
136 private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
137 for (boolean interruptibly : new boolean[] { true, false }) {
138
139 assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly));
140 }
141 }
142
143 public void testEmpty() throws Exception {
144 for (BlockingQueue<Object> q : blockingQueues()) {
145 testEmpty(q);
146 }
147 }
148
149 private void testEmpty(BlockingQueue<Object> q) {
150 assertDrained(q);
151 }
152
153 public void testNegativeMaxElements() throws Exception {
154 for (BlockingQueue<Object> q : blockingQueues()) {
155 testNegativeMaxElements(q);
156 }
157 }
158
159 private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
160 threadPool.submit(new Producer(q, 1));
161
162 List<Object> buf = Lists.newArrayList();
163 int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
164 assertEquals(elements, 0);
165 assertTrue(buf.isEmpty());
166
167
168
169 Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
170 }
171
172 public void testDrain_throws() throws Exception {
173 for (BlockingQueue<Object> q : blockingQueues()) {
174 testDrain_throws(q);
175 }
176 }
177
178 private void testDrain_throws(BlockingQueue<Object> q) {
179 threadPool.submit(new Interrupter(Thread.currentThread()));
180 try {
181 Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
182 fail();
183 } catch (InterruptedException expected) {
184 }
185 }
186
187 public void testDrainUninterruptibly_doesNotThrow() throws Exception {
188 for (BlockingQueue<Object> q : blockingQueues()) {
189 testDrainUninterruptibly_doesNotThrow(q);
190 }
191 }
192
193 private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
194 final Thread mainThread = Thread.currentThread();
195 threadPool.submit(new Runnable() {
196 public void run() {
197 new Producer(q, 50).run();
198 new Interrupter(mainThread).run();
199 new Producer(q, 50).run();
200 }
201 });
202 List<Object> buf = Lists.newArrayList();
203 int elements =
204 Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
205
206 assertTrue(Thread.interrupted());
207 assertEquals(100, elements);
208 assertEquals(100, buf.size());
209 }
210
211 public void testNewLinkedBlockingDequeCapacity() {
212 try {
213 Queues.newLinkedBlockingDeque(0);
214 fail("Should have thrown IllegalArgumentException");
215 } catch (IllegalArgumentException expected) {
216
217 }
218 assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity());
219 assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity());
220 }
221
222 public void testNewLinkedBlockingQueueCapacity() {
223 try {
224 Queues.newLinkedBlockingQueue(0);
225 fail("Should have thrown IllegalArgumentException");
226 } catch (IllegalArgumentException expected) {
227
228 }
229 assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
230 assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
231 }
232
233
234
235
236 private void assertDrained(BlockingQueue<Object> q) {
237 assertNull(q.peek());
238 assertInterruptibleDrained(q);
239 assertUninterruptibleDrained(q);
240 }
241
242 private void assertInterruptibleDrained(BlockingQueue<Object> q) {
243
244 try {
245 assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
246 } catch (InterruptedException e) {
247 throw new AssertionError();
248 }
249
250
251 threadPool.submit(new Interrupter(Thread.currentThread()));
252 try {
253
254 Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
255 fail();
256 } catch (InterruptedException expected) {
257
258 }
259 }
260
261
262 private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
263 assertEquals(0,
264 Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
265
266
267 threadPool.submit(new Interrupter(Thread.currentThread()));
268
269 long startTime = System.nanoTime();
270 Queues.drainUninterruptibly(
271 q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS);
272 assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
273
274 while (!Thread.interrupted()) { Thread.yield(); }
275 }
276
277 private static class Producer implements Runnable {
278 final BlockingQueue<Object> q;
279 final int elements;
280
281 Producer(BlockingQueue<Object> q, int elements) {
282 this.q = q;
283 this.elements = elements;
284 }
285
286 @Override public void run() {
287 try {
288 for (int i = 0; i < elements; i++) {
289 q.put(new Object());
290 }
291 } catch (InterruptedException e) {
292
293
294 e.printStackTrace();
295
296 Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
297 }
298 }
299 }
300
301 private static class Interrupter implements Runnable {
302 final Thread threadToInterrupt;
303
304 Interrupter(Thread threadToInterrupt) {
305 this.threadToInterrupt = threadToInterrupt;
306 }
307
308 @Override public void run() {
309 try {
310 Thread.sleep(100);
311 } catch (InterruptedException e) {
312 throw new AssertionError();
313 } finally {
314 threadToInterrupt.interrupt();
315 }
316 }
317 }
318 }